66180853b90c213c5cae5e5de02f499b437d609e,xap-core/xap-datagrid/src/main/java/com/gigaspaces/internal/cluster/node/impl/backlog/multisourcesinglefile/MultiSourceSingleFileReliableAsyncGroupBacklog.java,MultiSourceSingleFileReliableAsyncGroupBacklog,updateReliableAsyncState,#IReliableAsyncState#String#,195

Before Change


            MultiSourceSingleFileConfirmationHolder lastConfirmedBySource = getConfirmationHolderUnsafe(sourceMemberName);
            if (!lastConfirmedBySource.hadAnyHandshake()
                    || lastConfirmedBySource.getLastConfirmedKey() < minConfirmed) {
                decreaseWeight(sourceMemberName, lastConfirmedBySource.getLastConfirmedKey(), minConfirmed);
                lastConfirmedBySource.setLastConfirmedKey(minConfirmed);
            }
            clearConfirmedPackets();
        } finally {

After Change


    }

    @Override
    public void updateReliableAsyncState(IReliableAsyncState reliableAsyncState, String sourceMemberName) throws NoSuchReplicationMemberException, MissingReliableAsyncTargetStateException {
        MultiSourceSingleFileReliableAsyncState sharedAsyncState = (MultiSourceSingleFileReliableAsyncState) reliableAsyncState;
        _rwLock.writeLock().lock();
        try {
            if (_logger.isLoggable(Level.FINEST))
                _logger.finest(getLogPrefix()
                        + "incoming reliable async state update "
                        + Arrays.toString(sharedAsyncState.getAsyncTargetsState()));

            ReliableAsyncSourceGroupConfig sourceGroupConfig = getGroupConfigSnapshot();

            //This must be done before updating any member state, otherwise it can cause deletion of packets
            //that are not confirmed by the unknown member
            validateReliableAsyncUpdateTargetsMatch(reliableAsyncState, sourceMemberName);

            for (MultiSourceAsyncTargetState asyncTargetState : sharedAsyncState.getAsyncTargetsState()) {
                if (asyncTargetState.hadAnyHandshake()) {
                    long lastConfirmedKey = asyncTargetState.getLastConfirmedKey();
                    long lastReceivedKey = asyncTargetState.getLastReceivedKey();
                    String targetMemberName = asyncTargetState.getTargetMemberName();
                    final MultiSourceSingleFileConfirmationHolder confirmationHolder = getConfirmationHolderUnsafe(targetMemberName);
                    confirmationHolder.setLastConfirmedKey(lastConfirmedKey, targetMemberName, this);
                    confirmationHolder.setLastReceivedKey(lastReceivedKey);
                }
            }
            long minConfirmed = getNextKeyUnsafe() - 1;
            // We update all the sync keepers state to the minimum confirmed key
            // such that in case of a failover they will receive the proper
            // backlog
            if (sharedAsyncState.getAsyncTargetsState().length > 0) {
                minConfirmed = sharedAsyncState.getMinimumUnconfirmedKey() - 1;
                if (getNextKeyUnsafe() < minConfirmed + 1)
                    setNextKeyUnsafe(minConfirmed + 1);
            }

            MultiSourceSingleFileConfirmationHolder lastConfirmedBySource = getConfirmationHolderUnsafe(sourceMemberName);
            if (!lastConfirmedBySource.hadAnyHandshake()
                    || lastConfirmedBySource.getLastConfirmedKey() < minConfirmed) {
                lastConfirmedBySource.setLastConfirmedKey(minConfirmed, sourceMemberName, this);
            }
            clearConfirmedPackets();
        } finally {